{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cc959176",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | default_exp _testing.test_utils"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a39bc80a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "import asyncio\n",
    "import hashlib\n",
    "import platform\n",
    "import shlex\n",
    "import signal\n",
    "import subprocess  # nosec\n",
    "import unittest\n",
    "import unittest.mock\n",
    "from contextlib import contextmanager\n",
    "from pathlib import Path\n",
    "from tempfile import TemporaryDirectory\n",
    "from typing import *\n",
    "\n",
    "import asyncer\n",
    "from IPython.display import IFrame\n",
    "\n",
    "from fastkafka._application.app import FastKafka\n",
    "from fastkafka._components._subprocess import terminate_asyncio_process\n",
    "from fastkafka._components.helpers import _import_from_string, change_dir\n",
    "from fastkafka._components.logger import get_logger"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "911a1ccc",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "from inspect import signature\n",
    "\n",
    "import anyio\n",
    "import nest_asyncio\n",
    "import pytest\n",
    "from nbdev_mkdocs.docstring import run_examples_from_docstring\n",
    "from tqdm.notebook import tqdm, trange\n",
    "\n",
    "from fastkafka._components.logger import suppress_timestamps\n",
    "from fastkafka._helpers import consumes_messages, produce_messages\n",
    "from fastkafka._testing.apache_kafka_broker import ApacheKafkaBroker"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7c64116e",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | notest\n",
    "\n",
    "# allows async calls in notebooks\n",
    "\n",
    "nest_asyncio.apply()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7a2eb08b",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "logger = get_logger(__name__)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1f3eee37",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] __main__: ok\n"
     ]
    }
   ],
   "source": [
    "suppress_timestamps()\n",
    "logger = get_logger(__name__, level=20)\n",
    "logger.info(\"ok\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "48f69103",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "def nb_safe_seed(s: str) -> Callable[[int], int]:\n",
    "    \"\"\"Gets a unique seed function for a notebook\n",
    "\n",
    "    Params:\n",
    "        s: name of the notebook used to initialize the seed function\n",
    "\n",
    "    Returns:\n",
    "        A unique seed function\n",
    "    \"\"\"\n",
    "    init_seed = int(hashlib.sha256(s.encode(\"utf-8\")).hexdigest(), 16) % (10**8)\n",
    "\n",
    "    def _get_seed(x: int = 0, *, init_seed: int = init_seed) -> int:\n",
    "        return init_seed + x\n",
    "\n",
    "    return _get_seed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "dfba6520",
   "metadata": {},
   "outputs": [],
   "source": [
    "seed = nb_safe_seed(\"999_test_utils\")\n",
    "\n",
    "assert seed() == seed(0)\n",
    "assert seed() + 1 == seed(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9b04d73d",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "@contextmanager\n",
    "def mock_AIOKafkaProducer_send() -> Generator[unittest.mock.Mock, None, None]:\n",
    "    \"\"\"Mocks **send** method of **AIOKafkaProducer**\"\"\"\n",
    "    with unittest.mock.patch(\"__main__.AIOKafkaProducer.send\") as mock:\n",
    "\n",
    "        async def _f() -> None:\n",
    "            pass\n",
    "\n",
    "        mock.return_value = asyncio.create_task(_f())\n",
    "\n",
    "        yield mock"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "85420005",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "async def run_script_and_cancel(\n",
    "    script: str,\n",
    "    *,\n",
    "    script_file: Optional[str] = None,\n",
    "    cmd: Optional[str] = None,\n",
    "    cancel_after: int = 10,\n",
    "    app_name: str = \"app\",\n",
    "    kafka_app_name: str = \"kafka_app\",\n",
    "    generate_docs: bool = False,\n",
    ") -> Tuple[int, bytes]:\n",
    "    \"\"\"\n",
    "    Runs a script and cancels it after a predefined time.\n",
    "\n",
    "    Args:\n",
    "        script: A python source code to be executed in a separate subprocess.\n",
    "        script_file: Name of the script where script source will be saved.\n",
    "        cmd: Command to execute. If None, it will be set to 'python3 -m {Path(script_file).stem}'.\n",
    "        cancel_after: Number of seconds before sending SIGTERM signal.\n",
    "        app_name: Name of the app.\n",
    "        kafka_app_name: Name of the Kafka app.\n",
    "        generate_docs: Flag indicating whether to generate docs.\n",
    "\n",
    "    Returns:\n",
    "        A tuple containing the exit code and combined stdout and stderr as a binary string.\n",
    "    \"\"\"\n",
    "    if script_file is None:\n",
    "        script_file = \"script.py\"\n",
    "\n",
    "    if cmd is None:\n",
    "        cmd = f\"python3 -m {Path(script_file).stem}\"\n",
    "\n",
    "    with TemporaryDirectory() as d:\n",
    "        consumer_script = Path(d) / script_file\n",
    "\n",
    "        with open(consumer_script, \"w\") as file:\n",
    "            file.write(script)\n",
    "\n",
    "        if generate_docs:\n",
    "            logger.info(\n",
    "                f\"Generating docs for: {Path(script_file).stem}:{kafka_app_name}\"\n",
    "            )\n",
    "            try:\n",
    "                kafka_app: FastKafka = _import_from_string(\n",
    "                    f\"{Path(script_file).stem}:{kafka_app_name}\"\n",
    "                )\n",
    "                await asyncer.asyncify(kafka_app.create_docs)()\n",
    "            except Exception as e:\n",
    "                logger.warning(\n",
    "                    f\"Generating docs failed for: {Path(script_file).stem}:{kafka_app_name}, ignoring it for now.\"\n",
    "                )\n",
    "\n",
    "        creationflags = 0 if platform.system() != \"Windows\" else subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore\n",
    "        proc = subprocess.Popen(\n",
    "            shlex.split(cmd),\n",
    "            stdout=subprocess.PIPE,\n",
    "            stderr=subprocess.STDOUT,\n",
    "            cwd=d,\n",
    "            shell=True  # nosec: [B602:subprocess_without_shell_equals_true] subprocess call - check for execution of untrusted input.\n",
    "            if platform.system() == \"Windows\"\n",
    "            else False,\n",
    "            creationflags=creationflags,\n",
    "        )\n",
    "        await asyncio.sleep(cancel_after)\n",
    "        if platform.system() == \"Windows\":\n",
    "            proc.send_signal(signal.CTRL_BREAK_EVENT) # type: ignore\n",
    "        else:\n",
    "            proc.terminate()\n",
    "        output, _ = proc.communicate()\n",
    "\n",
    "        return (proc.returncode, output)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "09054da6",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Check exit code 0\n",
    "script = \"\"\"\n",
    "from time import sleep\n",
    "print(\"hello\")\n",
    "sleep({t})\n",
    "\"\"\"\n",
    "\n",
    "exit_code, output = await run_script_and_cancel(script.format(t=0), cancel_after=2)\n",
    "assert exit_code == 0, f\"{exit_code=}, {output=}\"\n",
    "assert output.decode(\"utf-8\").strip() == \"hello\", output.decode(\"utf-8\")\n",
    "\n",
    "exit_code, output = await run_script_and_cancel(script.format(t=5), cancel_after=2)\n",
    "if platform.system() == \"Windows\":\n",
    "    assert exit_code == 3221225786, f\"{exit_code=}, {output=}\"\n",
    "else:\n",
    "    assert exit_code < 0, f\"{exit_code=}, {output=}\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0c8484fa",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Check exit code 1\n",
    "script = \"exit(1)\"\n",
    "\n",
    "exit_code, output = await run_script_and_cancel(script, cancel_after=1)\n",
    "\n",
    "assert exit_code == 1\n",
    "assert output.decode(\"utf-8\") == \"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "40aaf329",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Check exit code 0 and output to stdout and stderr\n",
    "script = \"\"\"\n",
    "import sys\n",
    "sys.stderr.write(\"hello from stderr\\\\n\")\n",
    "sys.stderr.flush()\n",
    "print(\"hello, exiting with exit code 0\")\n",
    "exit(0)\n",
    "\"\"\"\n",
    "\n",
    "exit_code, output = await run_script_and_cancel(script, cancel_after=1)\n",
    "\n",
    "line_separator = \"\\r\\n\" if platform.system() == \"Windows\" else \"\\n\"\n",
    "\n",
    "assert exit_code == 0, exit_code\n",
    "assert (\n",
    "    output.decode(\"utf-8\") == f\"hello from stderr{line_separator}hello, exiting with exit code 0{line_separator}\"\n",
    "), output.decode(\"utf-8\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b346af63",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "ok\n"
     ]
    }
   ],
   "source": [
    "# Check random exit code and output\n",
    "script = \"\"\"\n",
    "print(\"hello\\\\nexiting with exit code 143\")\n",
    "exit(143)\n",
    "\"\"\"\n",
    "\n",
    "exit_code, output = await run_script_and_cancel(script, cancel_after=1)\n",
    "\n",
    "line_separator = \"\\r\\n\" if platform.system() == \"Windows\" else \"\\n\"\n",
    "\n",
    "assert exit_code == 143\n",
    "assert output.decode(\"utf-8\") == f\"hello{line_separator}exiting with exit code 143{line_separator}\"\n",
    "\n",
    "print(\"ok\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7ef6219a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | export\n",
    "\n",
    "\n",
    "async def display_docs(docs_path: str, port: int = 4000) -> None:\n",
    "    \"\"\"\n",
    "    Serves the documentation using an HTTP server.\n",
    "\n",
    "    Args:\n",
    "        docs_path: Path to the documentation.\n",
    "        port: Port number for the HTTP server. Defaults to 4000.\n",
    "\n",
    "    Returns:\n",
    "        None\n",
    "    \"\"\"\n",
    "    with change_dir(docs_path):\n",
    "        process = await asyncio.create_subprocess_exec(\n",
    "            \"python3\",\n",
    "            \"-m\",\n",
    "            \"http.server\",\n",
    "            f\"{port}\",\n",
    "            stdout=asyncio.subprocess.PIPE,\n",
    "            stderr=asyncio.subprocess.PIPE,\n",
    "        )\n",
    "        try:\n",
    "            from google.colab.output import eval_js\n",
    "\n",
    "            proxy = eval_js(f\"google.colab.kernel.proxyPort({port})\")\n",
    "            logger.info(\"Google colab detected! Proxy adjusted.\")\n",
    "        except:\n",
    "            proxy = f\"http://localhost:{port}\"\n",
    "        finally:\n",
    "            await asyncio.sleep(2)\n",
    "            display(IFrame(f\"{proxy}\", 1000, 700))  # type: ignore\n",
    "            await asyncio.sleep(2)\n",
    "            await terminate_asyncio_process(process)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5ebe5474",
   "metadata": {},
   "outputs": [],
   "source": [
    "example_html = \"\"\"\n",
    "\n",
    "\n",
    "    \n",
    "        Example\n",
    "    \n",
    "    \n",
    "        This is an example of a simple HTML page with one paragraph.\n",
    "    \n",
    "\n",
    "\"\"\"\n",
    "\n",
    "with TemporaryDirectory() as tmp:\n",
    "    with change_dir(tmp):\n",
    "        with open(Path(tmp) / \"index.html\", \"w\") as index_file:\n",
    "            index_file.write(example_html)\n",
    "        await display_docs(docs_path=tmp, port=4000)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
